BasePackPushConnection.java

/*
 * Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
 * Copyright (C) 2008, 2022 Shawn O. Pearce <spearce@spearce.org> and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Distribution License v. 1.0 which is available at
 * https://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 */

package org.eclipse.jgit.transport;

import static org.eclipse.jgit.transport.GitProtocolConstants.CAPABILITY_ATOMIC;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.PackProtocolException;
import org.eclipse.jgit.errors.TooLargeObjectInPackException;
import org.eclipse.jgit.errors.TooLargePackException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.pack.PackWriter;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.transport.RemoteRefUpdate.Status;

/**
 * Push implementation using the native Git pack transfer service.
 * <p>
 * This is the canonical implementation for transferring objects to the remote
 * repository from the local repository by talking to the 'git-receive-pack'
 * service. Objects are packed on the local side into a pack file and then sent
 * to the remote repository.
 * <p>
 * This connection requires only a bi-directional pipe or socket, and thus is
 * easily wrapped up into a local process pipe, anonymous TCP socket, or a
 * command executed through an SSH tunnel.
 * <p>
 * This implementation honors
 * {@link org.eclipse.jgit.transport.Transport#isPushThin()} option.
 * <p>
 * Concrete implementations should just call
 * {@link #init(java.io.InputStream, java.io.OutputStream)} and
 * {@link #readAdvertisedRefs()} methods in constructor or before any use. They
 * should also handle resources releasing in {@link #close()} method if needed.
 */
public abstract class BasePackPushConnection extends BasePackConnection implements
		PushConnection {
	/**
	 * The client expects a status report after the server processes the pack.
	 * @since 2.0
	 */
	public static final String CAPABILITY_REPORT_STATUS = GitProtocolConstants.CAPABILITY_REPORT_STATUS;

	/**
	 * The server supports deleting refs.
	 * @since 2.0
	 */
	public static final String CAPABILITY_DELETE_REFS = GitProtocolConstants.CAPABILITY_DELETE_REFS;

	/**
	 * The server supports packs with OFS deltas.
	 * @since 2.0
	 */
	public static final String CAPABILITY_OFS_DELTA = GitProtocolConstants.CAPABILITY_OFS_DELTA;

	/**
	 * The client supports using the 64K side-band for progress messages.
	 * @since 2.0
	 */
	public static final String CAPABILITY_SIDE_BAND_64K = GitProtocolConstants.CAPABILITY_SIDE_BAND_64K;

	/**
	 * The server supports the receiving of push options.
	 * @since 4.5
	 */
	public static final String CAPABILITY_PUSH_OPTIONS = GitProtocolConstants.CAPABILITY_PUSH_OPTIONS;

	private final boolean thinPack;
	private final boolean atomic;

	/** A list of option strings associated with this push. */
	private List<String> pushOptions;

	private boolean capableAtomic;
	private boolean capableDeleteRefs;
	private boolean capableReport;
	private boolean capableSideBand;
	private boolean capableOfsDelta;
	private boolean capablePushOptions;

	private boolean sentCommand;
	private boolean writePack;

	/** Time in milliseconds spent transferring the pack data. */
	private long packTransferTime;

	/**
	 * Create a new connection to push using the native git transport.
	 *
	 * @param packTransport
	 *            the transport.
	 */
	public BasePackPushConnection(PackTransport packTransport) {
		super(packTransport);
		thinPack = transport.isPushThin();
		atomic = transport.isPushAtomic();
		pushOptions = transport.getPushOptions();
	}

	/** {@inheritDoc} */
	@Override
	public void push(final ProgressMonitor monitor,
			final Map<String, RemoteRefUpdate> refUpdates)
			throws TransportException {
		push(monitor, refUpdates, null);
	}

	/** {@inheritDoc} */
	@Override
	public void push(final ProgressMonitor monitor,
			final Map<String, RemoteRefUpdate> refUpdates, OutputStream outputStream)
			throws TransportException {
		markStartedOperation();
		doPush(monitor, refUpdates, outputStream);
	}

	/** {@inheritDoc} */
	@Override
	protected TransportException noRepository(Throwable cause) {
		// Sadly we cannot tell the "invalid URI" case from "push not allowed".
		// Opening a fetch connection can help us tell the difference, as any
		// useful repository is going to support fetch if it also would allow
		// push. So if fetch throws NoRemoteRepositoryException we know the
		// URI is wrong. Otherwise we can correctly state push isn't allowed
		// as the fetch connection opened successfully.
		//
		TransportException te;
		try {
			transport.openFetch().close();
			te = new TransportException(uri, JGitText.get().pushNotPermitted);
		} catch (NoRemoteRepositoryException e) {
			// Fetch concluded the repository doesn't exist.
			te = e;
		} catch (NotSupportedException | TransportException e) {
			te = new TransportException(uri, JGitText.get().pushNotPermitted, e);
		}
		te.addSuppressed(cause);
		return te;
	}

	/**
	 * Push one or more objects and update the remote repository.
	 *
	 * @param monitor
	 *            progress monitor to receive status updates.
	 * @param refUpdates
	 *            update commands to be applied to the remote repository.
	 * @param outputStream
	 *            output stream to write sideband messages to
	 * @throws org.eclipse.jgit.errors.TransportException
	 *             if any exception occurs.
	 * @since 3.0
	 */
	protected void doPush(final ProgressMonitor monitor,
			final Map<String, RemoteRefUpdate> refUpdates,
			OutputStream outputStream) throws TransportException {
		try {
			writeCommands(refUpdates.values(), monitor, outputStream);

			if (pushOptions != null && capablePushOptions)
				transmitOptions();
			if (writePack)
				writePack(refUpdates, monitor);
			if (sentCommand) {
				if (capableReport)
					readStatusReport(refUpdates);
				if (capableSideBand) {
					// Ensure the data channel is at EOF, so we know we have
					// read all side-band data from all channels and have a
					// complete copy of the messages (if any) buffered from
					// the other data channels.
					//
					int b = in.read();
					if (0 <= b) {
						throw new TransportException(uri, MessageFormat.format(
								JGitText.get().expectedEOFReceived,
								Character.valueOf((char) b)));
					}
				}
			}
		} catch (TransportException e) {
			throw e;
		} catch (Exception e) {
			throw new TransportException(uri, e.getMessage(), e);
		} finally {
			if (in instanceof SideBandInputStream) {
				((SideBandInputStream) in).drainMessages();
			}
			close();
		}
	}

	private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
			final ProgressMonitor monitor, OutputStream outputStream) throws IOException {
		final String capabilities = enableCapabilities(monitor, outputStream);
		if (atomic && !capableAtomic) {
			throw new TransportException(uri,
					JGitText.get().atomicPushNotSupported);
		}

		if (pushOptions != null && !capablePushOptions) {
			throw new TransportException(uri,
					MessageFormat.format(JGitText.get().pushOptionsNotSupported,
							pushOptions.toString()));
		}

		for (RemoteRefUpdate rru : refUpdates) {
			if (!capableDeleteRefs && rru.isDelete()) {
				rru.setStatus(Status.REJECTED_NODELETE);
				continue;
			}

			final StringBuilder sb = new StringBuilder();
			ObjectId oldId = rru.getExpectedOldObjectId();
			if (oldId == null) {
				final Ref advertised = getRef(rru.getRemoteName());
				oldId = advertised != null ? advertised.getObjectId() : null;
				if (oldId == null) {
					oldId = ObjectId.zeroId();
				}
			}
			sb.append(oldId.name());
			sb.append(' ');
			sb.append(rru.getNewObjectId().name());
			sb.append(' ');
			sb.append(rru.getRemoteName());
			if (!sentCommand) {
				sentCommand = true;
				sb.append(capabilities);
			}

			pckOut.writeString(sb.toString());
			rru.setStatus(Status.AWAITING_REPORT);
			if (!rru.isDelete())
				writePack = true;
		}

		if (monitor.isCancelled())
			throw new TransportException(uri, JGitText.get().pushCancelled);
		pckOut.end();
		outNeedsEnd = false;
	}

	private void transmitOptions() throws IOException {
		for (String pushOption : pushOptions) {
			pckOut.writeString(pushOption);
		}

		pckOut.end();
	}

	private String enableCapabilities(final ProgressMonitor monitor,
			OutputStream outputStream) {
		final StringBuilder line = new StringBuilder();
		if (atomic)
			capableAtomic = wantCapability(line, CAPABILITY_ATOMIC);
		capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS);
		capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS);
		capableOfsDelta = wantCapability(line, CAPABILITY_OFS_DELTA);

		if (pushOptions != null) {
			capablePushOptions = wantCapability(line, CAPABILITY_PUSH_OPTIONS);
		}

		capableSideBand = wantCapability(line, CAPABILITY_SIDE_BAND_64K);
		if (capableSideBand) {
			in = new SideBandInputStream(in, monitor, getMessageWriter(),
					outputStream);
			pckIn = new PacketLineIn(in);
		}
		addUserAgentCapability(line);

		if (line.length() > 0)
			line.setCharAt(0, '\0');
		return line.toString();
	}

	private void writePack(final Map<String, RemoteRefUpdate> refUpdates,
			final ProgressMonitor monitor) throws IOException {
		Set<ObjectId> remoteObjects = new HashSet<>();
		Set<ObjectId> newObjects = new HashSet<>();

		try (PackWriter writer = new PackWriter(transport.getPackConfig(),
				local.newObjectReader())) {

			for (Ref r : getRefs()) {
				// only add objects that we actually have
				ObjectId oid = r.getObjectId();
				if (local.getObjectDatabase().has(oid))
					remoteObjects.add(oid);
			}
			remoteObjects.addAll(additionalHaves);
			for (RemoteRefUpdate r : refUpdates.values()) {
				if (!ObjectId.zeroId().equals(r.getNewObjectId()))
					newObjects.add(r.getNewObjectId());
			}

			writer.setIndexDisabled(true);
			writer.setUseCachedPacks(true);
			writer.setUseBitmaps(true);
			writer.setThin(thinPack);
			writer.setReuseValidatingObjects(false);
			writer.setDeltaBaseAsOffset(capableOfsDelta);
			writer.preparePack(monitor, newObjects, remoteObjects);

			OutputStream packOut = out;
			if (capableSideBand) {
				packOut = new CheckingSideBandOutputStream(in, out);
			}
			writer.writePack(monitor, monitor, packOut);

			packTransferTime = writer.getStatistics().getTimeWriting();
		}
	}

	private void readStatusReport(Map<String, RemoteRefUpdate> refUpdates)
			throws IOException {
		final String unpackLine = readStringLongTimeout();
		if (!unpackLine.startsWith("unpack ")) //$NON-NLS-1$
			throw new PackProtocolException(uri, MessageFormat
					.format(JGitText.get().unexpectedReportLine, unpackLine));
		final String unpackStatus = unpackLine.substring("unpack ".length()); //$NON-NLS-1$
		if (unpackStatus.startsWith("error Pack exceeds the limit of")) {//$NON-NLS-1$
			throw new TooLargePackException(uri,
					unpackStatus.substring("error ".length())); //$NON-NLS-1$
		} else if (unpackStatus.startsWith("error Object too large")) {//$NON-NLS-1$
			throw new TooLargeObjectInPackException(uri,
					unpackStatus.substring("error ".length())); //$NON-NLS-1$
		} else if (!unpackStatus.equals("ok")) { //$NON-NLS-1$
			throw new TransportException(uri, MessageFormat.format(
					JGitText.get().errorOccurredDuringUnpackingOnTheRemoteEnd, unpackStatus));
		}

		for (String refLine : pckIn.readStrings()) {
			boolean ok = false;
			int refNameEnd = -1;
			if (refLine.startsWith("ok ")) { //$NON-NLS-1$
				ok = true;
				refNameEnd = refLine.length();
			} else if (refLine.startsWith("ng ")) { //$NON-NLS-1$
				ok = false;
				refNameEnd = refLine.indexOf(' ', 3);
			}
			if (refNameEnd == -1)
				throw new PackProtocolException(MessageFormat.format(JGitText.get().unexpectedReportLine2
						, uri, refLine));
			final String refName = refLine.substring(3, refNameEnd);
			final String message = (ok ? null : refLine
					.substring(refNameEnd + 1));

			final RemoteRefUpdate rru = refUpdates.get(refName);
			if (rru == null)
				throw new PackProtocolException(MessageFormat.format(JGitText.get().unexpectedRefReport, uri, refName));
			if (ok) {
				rru.setStatus(Status.OK);
			} else {
				rru.setStatus(Status.REJECTED_OTHER_REASON);
				rru.setMessage(message);
			}
		}
		for (RemoteRefUpdate rru : refUpdates.values()) {
			if (rru.getStatus() == Status.AWAITING_REPORT)
				throw new PackProtocolException(MessageFormat.format(
						JGitText.get().expectedReportForRefNotReceived , uri, rru.getRemoteName()));
		}
	}

	private String readStringLongTimeout() throws IOException {
		if (timeoutIn == null)
			return pckIn.readString();

		// The remote side may need a lot of time to choke down the pack
		// we just sent them. There may be many deltas that need to be
		// resolved by the remote. Its hard to say how long the other
		// end is going to be silent. Taking 10x the configured timeout
		// or the time spent transferring the pack, whichever is larger,
		// gives the other side some reasonable window to process the data,
		// but this is just a wild guess.
		//
		final int oldTimeout = timeoutIn.getTimeout();
		final int sendTime = (int) Math.min(packTransferTime, 28800000L);
		try {
			int timeout = 10 * Math.max(sendTime, oldTimeout);
			timeoutIn.setTimeout((timeout < 0) ? Integer.MAX_VALUE : timeout);
			return pckIn.readString();
		} finally {
			timeoutIn.setTimeout(oldTimeout);
		}
	}

	/**
	 * Gets the list of option strings associated with this push.
	 *
	 * @return pushOptions
	 * @since 4.5
	 */
	public List<String> getPushOptions() {
		return pushOptions;
	}

	private static class CheckingSideBandOutputStream extends OutputStream {
		private final InputStream in;
		private final OutputStream out;

		CheckingSideBandOutputStream(InputStream in, OutputStream out) {
			this.in = in;
			this.out = out;
		}

		@Override
		public void write(int b) throws IOException {
			write(new byte[] { (byte) b });
		}

		@Override
		public void write(byte[] buf, int ptr, int cnt) throws IOException {
			try {
				out.write(buf, ptr, cnt);
			} catch (IOException e) {
				throw checkError(e);
			}
		}

		@Override
		public void flush() throws IOException {
			try {
				out.flush();
			} catch (IOException e) {
				throw checkError(e);
			}
		}

		private IOException checkError(IOException e1) {
			try {
				in.read();
			} catch (TransportException e2) {
				return e2;
			} catch (IOException e2) {
				return e1;
			}
			return e1;
		}
	}
}